package com.gzyj.flink.source.mysql;

import com.gzyj.flink.veh.AnbiaoVehicle;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlVehicleSource extends RichSourceFunction<AnbiaoVehicle> {
    private PreparedStatement ps;
    private Connection connection;
    private boolean stop = false;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            ExecutionConfig.GlobalJobParameters globalJobParam=getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            Configuration globconf=(Configuration)globalJobParam;
            this.connection = DriverManager.getConnection(globconf.getString("mysql-url",null),
                    globconf.getString("mysql-user","root"),
                    globconf.getString("mysql-password","123456"));
            this.ps = this.connection.prepareStatement("select * from anbiao_vehicle where is_deleted=0 or is_deleted is null",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
            this.ps.setFetchSize(50*1000);
            this.ps.setMaxRows(50*1000);
            this.ps.setFetchDirection(ResultSet.FETCH_REVERSE);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }


    }

    @Override
    public void run(SourceContext<AnbiaoVehicle> sourceContext) throws Exception {

        while (!this.stop) {
            ResultSet rs = this.ps.executeQuery();
            while (rs.next()) {

                AnbiaoVehicle anbiaoVehicle = new AnbiaoVehicle();

                anbiaoVehicle.setId(rs.getString("id"));
                anbiaoVehicle.setDeptId(rs.getLong("dept_id"));
                anbiaoVehicle.setCheliangpaizhao(rs.getString("cheliangpaizhao"));
                anbiaoVehicle.setChepaiyanse(rs.getString("chepaiyanse"));
                anbiaoVehicle.setShiyongxingzhi(rs.getString("shiyongxingzhi"));
                anbiaoVehicle.setJiashiyuanid(rs.getString("jiashiyuanid"));
                anbiaoVehicle.setChangpai(rs.getString("changpai"));
                anbiaoVehicle.setXinghao(rs.getString("xinghao"));
                anbiaoVehicle.setChejiahao(rs.getString("chejiahao"));
                anbiaoVehicle.setLuntaiguige(rs.getString("luntaiguige"));
                anbiaoVehicle.setCheshenyanse(rs.getString("cheshenyanse"));
                anbiaoVehicle.setHedingzaike(rs.getString("hedingzaike"));
                anbiaoVehicle.setYingyunnianxian(rs.getString("yingyunnianxian"));
                anbiaoVehicle.setDengjizhengshubianhao(rs.getString("dengjizhengshubianhao"));
                anbiaoVehicle.setChelianglaiyuan(rs.getString("chelianglaiyuan"));
                anbiaoVehicle.setZhucedengjishijian(rs.getString("zhucedengjishijian"));
                anbiaoVehicle.setRuhushijian(rs.getString("ruhushijian"));
                anbiaoVehicle.setGuohushijian(rs.getString("guohushijian"));
                anbiaoVehicle.setTuishishijian(rs.getString("tuishishijian"));
                anbiaoVehicle.setQiangzhibaofeishijian(rs.getString("qiangzhibaofeishijian"));
                anbiaoVehicle.setJieboyunshuzhenghao(rs.getString("jieboyunshuzhenghao"));
                anbiaoVehicle.setYuancheliangpaizhao(rs.getString("yuancheliangpaizhao"));
                anbiaoVehicle.setCheliangzhuangtai(rs.getInt("cheliangzhuangtai"));
                anbiaoVehicle.setCheliangtingfangdiqu(rs.getString("cheliangtingfangdiqu"));
                anbiaoVehicle.setDanganhao(rs.getString("danganhao"));
                anbiaoVehicle.setBeiyongcheliang(rs.getString("beiyongcheliang"));
                anbiaoVehicle.setYunyingshang(rs.getString("yunyingshang"));
                anbiaoVehicle.setSuoshuchedui(rs.getString("suoshuchedui"));
                anbiaoVehicle.setXingshifujian(rs.getString("xingshifujian"));
                anbiaoVehicle.setFujian(rs.getString("fujian"));
                anbiaoVehicle.setFadongjixinghao(rs.getString("fadongjixinghao"));
                anbiaoVehicle.setFadongjihao(rs.getString("fadongjihao"));
                anbiaoVehicle.setFadongjipailianggonglv(rs.getString("fadongjipailianggonglv"));
                anbiaoVehicle.setRanliaoleibie(rs.getString("ranliaoleibie"));
                anbiaoVehicle.setRanyouxiaohao(rs.getString("ranyouxiaohao"));
                anbiaoVehicle.setPaifangbiaozhun(rs.getString("paifangbiaozhun"));
                anbiaoVehicle.setZhuanxiangfangshi(rs.getString("zhuanxiangfangshi"));
                anbiaoVehicle.setChemenshezhi(rs.getString("chemenshezhi"));
                anbiaoVehicle.setZhouju(rs.getString("zhouju"));
                anbiaoVehicle.setChechang(rs.getString("chechang"));
                anbiaoVehicle.setChekuan(rs.getString("chekuan"));
                anbiaoVehicle.setChegao(rs.getString("chegao"));
                anbiaoVehicle.setLuntaishu(rs.getString("luntaishu"));
                anbiaoVehicle.setChezhoushu(rs.getString("chezhoushu"));
                anbiaoVehicle.setGangbantanhuangpianshu(rs.getString("gangbantanhuangpianshu"));
                anbiaoVehicle.setDipanxinghao(rs.getString("dipanxinghao"));
                anbiaoVehicle.setDonglileixing(rs.getString("donglileixing"));
                anbiaoVehicle.setZongzhiliang(rs.getString("zongzhiliang"));
                anbiaoVehicle.setZhengbeizhiliang(rs.getString("zhengbeizhiliang"));
                anbiaoVehicle.setLuntaizonglei(rs.getString("luntaizonglei"));
                anbiaoVehicle.setXuanguaxingshi(rs.getString("xuanguaxingshi"));
                anbiaoVehicle.setXingchezhidongfangshi(rs.getString("xingchezhidongfangshi"));
                anbiaoVehicle.setZhidongqiqianlun(rs.getString("zhidongqiqianlun"));
                anbiaoVehicle.setZhidongqihoulun(rs.getString("zhidongqihoulun"));
                anbiaoVehicle.setAbs(rs.getString("abs"));
                anbiaoVehicle.setKongtiaoxitong(rs.getString("kongtiaoxitong"));
                anbiaoVehicle.setHuanshuqi(rs.getString("huanshuqi"));
                anbiaoVehicle.setBiansuxiangxingshi(rs.getString("biansuxiangxingshi"));
                anbiaoVehicle.setZhizhaochangshang(rs.getString("zhizhaochangshang"));
                anbiaoVehicle.setGouzhishuizhenghao(rs.getString("gouzhishuizhenghao"));
                anbiaoVehicle.setChuchangriqi(rs.getString("chuchangriqi"));
                anbiaoVehicle.setLeijilicheng(rs.getString("leijilicheng"));
                anbiaoVehicle.setZhongduanfuwuqi(rs.getString("zhongduanfuwuqi"));
                anbiaoVehicle.setCheliangdengji(rs.getString("cheliangdengji"));
                anbiaoVehicle.setWeishengjian(rs.getString("weishengjian"));
                anbiaoVehicle.setFadongjipailiang(rs.getString("fadongjipailiang"));
                anbiaoVehicle.setCheliangwaikuochicun(rs.getString("cheliangwaikuochicun"));
                anbiaoVehicle.setRanliaoxiaohaofujian(rs.getString("ranliaoxiaohaofujian"));
                anbiaoVehicle.setBeizhu(rs.getString("beizhu"));
                anbiaoVehicle.setGpsanzhuangshijian(rs.getString("gpsanzhuangshijian"));
                anbiaoVehicle.setZhinenghuaxitong(rs.getString("zhinenghuaxitong"));
                anbiaoVehicle.setGps(rs.getString("gps"));
                anbiaoVehicle.setXingshijiluyi(rs.getString("xingshijiluyi"));
                anbiaoVehicle.setZongduanid(rs.getString("zongduanid"));
                anbiaoVehicle.setZongduanxinghao(rs.getString("zongduanxinghao"));
                anbiaoVehicle.setIsDeleted(rs.getInt("is_deleted"));
                anbiaoVehicle.setCreatetime(rs.getDate("createtime"));
                anbiaoVehicle.setCaozuoren(rs.getString("caozuoren"));
                anbiaoVehicle.setCaozuorenid(rs.getLong("caozuorenid"));
                anbiaoVehicle.setCaozuoshijian(rs.getDate("caozuoshijian"));
                anbiaoVehicle.setCheliangzhaopian(rs.getString("cheliangzhaopian"));
                anbiaoVehicle.setYunshujiezhi(rs.getString("yunshujiezhi"));
                anbiaoVehicle.setCheliangyunyingleixing(rs.getString("cheliangyunyingleixing"));
                anbiaoVehicle.setZhongduanchangshang(rs.getString("zhongduanchangshang"));
                anbiaoVehicle.setSimnum(rs.getString("simnum"));
                anbiaoVehicle.setTongbushijian(rs.getDate("tongbushijian"));
                anbiaoVehicle.setYunyingshangmingcheng(rs.getString("yunyingshangmingcheng"));
                anbiaoVehicle.setJiashiyuanxingming(rs.getString("jiashiyuanxingming"));
                anbiaoVehicle.setJiashiyuandianhua(rs.getString("jiashiyuandianhua"));
                anbiaoVehicle.setYayunyuanxingming(rs.getString("yayunyuanxingming"));
                anbiaoVehicle.setYayunyuandianhua(rs.getString("yayunyuandianhua"));
                anbiaoVehicle.setChezhu(rs.getString("chezhu"));
                anbiaoVehicle.setChezhudianhua(rs.getString("chezhudianhua"));
                anbiaoVehicle.setZhongduanleixing(rs.getInt("zhongduanleixing"));
                anbiaoVehicle.setYayunyuanid(rs.getString("yayunyuanid"));
                anbiaoVehicle.setTerminalprotocoltype(rs.getString("terminalprotocoltype"));
                anbiaoVehicle.setVideochannelnum(rs.getString("videochannelnum"));
                anbiaoVehicle.setPlatformconnectiontype(rs.getString("platformconnectiontype"));
                anbiaoVehicle.setTeamno(rs.getString("teamno"));
                anbiaoVehicle.setOwenno(rs.getString("owenno"));
                anbiaoVehicle.setDaoluyunshuzheng(rs.getString("daoluyunshuzheng"));
                anbiaoVehicle.setDaoluyunshuzhengchulingriqi(rs.getString("daoluyunshuzhengchulingriqi"));
                anbiaoVehicle.setDaoluyunshuzhengyouxiaoqi(rs.getString("daoluyunshuzhengyouxiaoqi"));
                anbiaoVehicle.setBencinianshenriqi(rs.getString("bencinianshenriqi"));
                anbiaoVehicle.setXiacinianshenriqi(rs.getString("xiacinianshenriqi"));
                anbiaoVehicle.setBencinianjianriqi(rs.getString("bencinianjianriqi"));
                anbiaoVehicle.setXiacinianjianriqi(rs.getString("xiacinianjianriqi"));
                anbiaoVehicle.setBencijipingriqi(rs.getString("bencijipingriqi"));
                anbiaoVehicle.setXiacijipingriqi(rs.getString("xiacijipingriqi"));
                anbiaoVehicle.setBaofeiriqi(rs.getString("baofeiriqi"));
                anbiaoVehicle.setCheliangjishudengji(rs.getString("cheliangjishudengji"));
                anbiaoVehicle.setSuoshuyunguan(rs.getString("suoshuyunguan"));
                anbiaoVehicle.setArea(rs.getString("area"));
                anbiaoVehicle.setDaoluyunzhengfujian(rs.getString("daoluyunzhengfujian"));
                anbiaoVehicle.setDabiaojianchafujian(rs.getString("dabiaojianchafujian"));
                anbiaoVehicle.setNianshenfujian(rs.getString("nianshenfujian"));
                anbiaoVehicle.setXingshizhengzhengyefujian(rs.getString("xingshizhengzhengyefujian"));
                anbiaoVehicle.setXingshizhengfuyefujian(rs.getString("xingshizhengfuyefujian"));
                anbiaoVehicle.setCheliangdengjizhengshuzhengyefujian(rs.getString("cheliangdengjizhengshuzhengyefujian"));
                anbiaoVehicle.setCheliangdengjizhengshufuyefujian(rs.getString("cheliangdengjizhengshufuyefujian"));
                anbiaoVehicle.setBaoxiandaoqishijian(rs.getString("baoxiandaoqishijian"));
                anbiaoVehicle.setXingshizhengdaoqishijian(rs.getString("xingshizhengdaoqishijian"));
                anbiaoVehicle.setCarowneraddress(rs.getString("carowneraddress"));
                anbiaoVehicle.setZhunqianyinzongzhiliang(rs.getString("zhunqianyinzongzhiliang"));
                anbiaoVehicle.setHedingzaizhiliang(rs.getString("hedingzaizhiliang"));
                anbiaoVehicle.setHuoxiangneibuchicun(rs.getString("huoxiangneibuchicun"));
                anbiaoVehicle.setYunyingshangjieruma(rs.getString("yunyingshangjieruma"));


                sourceContext.collect(anbiaoVehicle);


            }


            Thread.sleep(30 * 1000 * 60);
        }


    }

    @Override
    public void cancel() {
        this.stop = true;
        try {

            if (connection != null) { //关闭连接和释放资源
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {

        }

    }
}
